home *** CD-ROM | disk | FTP | other *** search
- """This file implements all-or-nothing package transformations.
-
- An all-or-nothing package transformation is one in which some text is
- transformed into message blocks, such that all blocks must be obtained before
- the reverse transformation can be applied. Thus, if any blocks are corrupted
- or lost, the original message cannot be reproduced.
-
- An all-or-nothing package transformation is not encryption, although a block
- cipher algorithm is used. The encryption key is randomly generated and is
- extractable from the message blocks.
-
- This class implements the All-Or-Nothing package transformation algorithm
- described in:
-
- Ronald L. Rivest. "All-Or-Nothing Encryption and The Package Transform"
- http://theory.lcs.mit.edu/~rivest/fusion.pdf
-
- """
-
- __revision__ = "$Id: AllOrNothing.py,v 1.8 2003/02/28 15:23:20 akuchling Exp $"
-
- import operator
- import string
- from Crypto.Util.number import bytes_to_long, long_to_bytes
-
-
-
- class AllOrNothing:
- """Class implementing the All-or-Nothing package transform.
-
- Methods for subclassing:
-
- _inventkey(key_size):
- Returns a randomly generated key. Subclasses can use this to
- implement better random key generating algorithms. The default
- algorithm is probably not very cryptographically secure.
-
- """
-
- def __init__(self, ciphermodule, mode=None, IV=None):
- """AllOrNothing(ciphermodule, mode=None, IV=None)
-
- ciphermodule is a module implementing the cipher algorithm to
- use. It must provide the PEP272 interface.
-
- Note that the encryption key is randomly generated
- automatically when needed. Optional arguments mode and IV are
- passed directly through to the ciphermodule.new() method; they
- are the feedback mode and initialization vector to use. All
- three arguments must be the same for the object used to create
- the digest, and to undigest'ify the message blocks.
- """
-
- self.__ciphermodule = ciphermodule
- self.__mode = mode
- self.__IV = IV
- self.__key_size = ciphermodule.key_size
- if self.__key_size == 0:
- self.__key_size = 16
-
- __K0digit = chr(0x69)
-
- def digest(self, text):
- """digest(text:string) : [string]
-
- Perform the All-or-Nothing package transform on the given
- string. Output is a list of message blocks describing the
- transformed text, where each block is a string of bit length equal
- to the ciphermodule's block_size.
- """
-
- # generate a random session key and K0, the key used to encrypt the
- # hash blocks. Rivest calls this a fixed, publically-known encryption
- # key, but says nothing about the security implications of this key or
- # how to choose it.
- key = self._inventkey(self.__key_size)
- K0 = self.__K0digit * self.__key_size
-
- # we need two cipher objects here, one that is used to encrypt the
- # message blocks and one that is used to encrypt the hashes. The
- # former uses the randomly generated key, while the latter uses the
- # well-known key.
- mcipher = self.__newcipher(key)
- hcipher = self.__newcipher(K0)
-
- # Pad the text so that its length is a multiple of the cipher's
- # block_size. Pad with trailing spaces, which will be eliminated in
- # the undigest() step.
- block_size = self.__ciphermodule.block_size
- padbytes = block_size - (len(text) % block_size)
- text = text + ' ' * padbytes
-
- # Run through the algorithm:
- # s: number of message blocks (size of text / block_size)
- # input sequence: m1, m2, ... ms
- # random key K' (`key' in the code)
- # Compute output sequence: m'1, m'2, ... m's' for s' = s + 1
- # Let m'i = mi ^ E(K', i) for i = 1, 2, 3, ..., s
- # Let m's' = K' ^ h1 ^ h2 ^ ... hs
- # where hi = E(K0, m'i ^ i) for i = 1, 2, ... s
- #
- # The one complication I add is that the last message block is hard
- # coded to the number of padbytes added, so that these can be stripped
- # during the undigest() step
- s = len(text) / block_size
- blocks = []
- hashes = []
- for i in range(1, s+1):
- start = (i-1) * block_size
- end = start + block_size
- mi = text[start:end]
- assert len(mi) == block_size
- cipherblock = mcipher.encrypt(long_to_bytes(i, block_size))
- mticki = bytes_to_long(mi) ^ bytes_to_long(cipherblock)
- blocks.append(mticki)
- # calculate the hash block for this block
- hi = hcipher.encrypt(long_to_bytes(mticki ^ i, block_size))
- hashes.append(bytes_to_long(hi))
-
- # Add the padbytes length as a message block
- i = i + 1
- cipherblock = mcipher.encrypt(long_to_bytes(i, block_size))
- mticki = padbytes ^ bytes_to_long(cipherblock)
- blocks.append(mticki)
-
- # calculate this block's hash
- hi = hcipher.encrypt(long_to_bytes(mticki ^ i, block_size))
- hashes.append(bytes_to_long(hi))
-
- # Now calculate the last message block of the sequence 1..s'. This
- # will contain the random session key XOR'd with all the hash blocks,
- # so that for undigest(), once all the hash blocks are calculated, the
- # session key can be trivially extracted. Calculating all the hash
- # blocks requires that all the message blocks be received, thus the
- # All-or-Nothing algorithm succeeds.
- mtick_stick = bytes_to_long(key) ^ reduce(operator.xor, hashes)
- blocks.append(mtick_stick)
-
- # we convert the blocks to strings since in Python, byte sequences are
- # always represented as strings. This is more consistent with the
- # model that encryption and hash algorithms always operate on strings.
- return map(long_to_bytes, blocks)
-
-
- def undigest(self, blocks):
- """undigest(blocks : [string]) : string
-
- Perform the reverse package transformation on a list of message
- blocks. Note that the ciphermodule used for both transformations
- must be the same. blocks is a list of strings of bit length
- equal to the ciphermodule's block_size.
- """
-
- # better have at least 2 blocks, for the padbytes package and the hash
- # block accumulator
- if len(blocks) < 2:
- raise ValueError, "List must be at least length 2."
-
- # blocks is a list of strings. We need to deal with them as long
- # integers
- blocks = map(bytes_to_long, blocks)
-
- # Calculate the well-known key, to which the hash blocks are
- # encrypted, and create the hash cipher.
- K0 = self.__K0digit * self.__key_size
- hcipher = self.__newcipher(K0)
-
- # Since we have all the blocks (or this method would have been called
- # prematurely), we can calcualte all the hash blocks.
- hashes = []
- for i in range(1, len(blocks)):
- mticki = blocks[i-1] ^ i
- hi = hcipher.encrypt(long_to_bytes(mticki))
- hashes.append(bytes_to_long(hi))
-
- # now we can calculate K' (key). remember the last block contains
- # m's' which we don't include here
- key = blocks[-1] ^ reduce(operator.xor, hashes)
-
- # and now we can create the cipher object
- mcipher = self.__newcipher(long_to_bytes(key))
- block_size = self.__ciphermodule.block_size
-
- # And we can now decode the original message blocks
- parts = []
- for i in range(1, len(blocks)):
- cipherblock = mcipher.encrypt(long_to_bytes(i, block_size))
- mi = blocks[i-1] ^ bytes_to_long(cipherblock)
- parts.append(mi)
-
- # The last message block contains the number of pad bytes appended to
- # the original text string, such that its length was an even multiple
- # of the cipher's block_size. This number should be small enough that
- # the conversion from long integer to integer should never overflow
- padbytes = int(parts[-1])
- text = string.join(map(long_to_bytes, parts[:-1]), '')
- return text[:-padbytes]
-
- def _inventkey(self, key_size):
- # TBD: Not a very secure algorithm. Eventually, I'd like to use JHy's
- # kernelrand module
- import time
- from Crypto.Util import randpool
- # TBD: key_size * 2 to work around possible bug in RandomPool?
- pool = randpool.RandomPool(key_size * 2)
- while key_size > pool.entropy:
- pool.add_event()
-
- # we now have enough entropy in the pool to get a key_size'd key
- return pool.get_bytes(key_size)
-
- def __newcipher(self, key):
- if self.__mode is None and self.__IV is None:
- return self.__ciphermodule.new(key)
- elif self.__IV is None:
- return self.__ciphermodule.new(key, self.__mode)
- else:
- return self.__ciphermodule.new(key, self.__mode, self.__IV)
-
-
-
- if __name__ == '__main__':
- import sys
- import getopt
- import base64
-
- usagemsg = '''\
- Test module usage: %(program)s [-c cipher] [-l] [-h]
-
- Where:
- --cipher module
- -c module
- Cipher module to use. Default: %(ciphermodule)s
-
- --aslong
- -l
- Print the encoded message blocks as long integers instead of base64
- encoded strings
-
- --help
- -h
- Print this help message
- '''
-
- ciphermodule = 'AES'
- aslong = 0
-
- def usage(code, msg=None):
- if msg:
- print msg
- print usagemsg % {'program': sys.argv[0],
- 'ciphermodule': ciphermodule}
- sys.exit(code)
-
- try:
- opts, args = getopt.getopt(sys.argv[1:],
- 'c:l', ['cipher=', 'aslong'])
- except getopt.error, msg:
- usage(1, msg)
-
- if args:
- usage(1, 'Too many arguments')
-
- for opt, arg in opts:
- if opt in ('-h', '--help'):
- usage(0)
- elif opt in ('-c', '--cipher'):
- ciphermodule = arg
- elif opt in ('-l', '--aslong'):
- aslong = 1
-
- # ugly hack to force __import__ to give us the end-path module
- module = __import__('Crypto.Cipher.'+ciphermodule, None, None, ['new'])
-
- a = AllOrNothing(module)
- print 'Original text:\n=========='
- print __doc__
- print '=========='
- msgblocks = a.digest(__doc__)
- print 'message blocks:'
- for i, blk in map(None, range(len(msgblocks)), msgblocks):
- # base64 adds a trailing newline
- print ' %3d' % i,
- if aslong:
- print bytes_to_long(blk)
- else:
- print base64.encodestring(blk)[:-1]
- #
- # get a new undigest-only object so there's no leakage
- b = AllOrNothing(module)
- text = b.undigest(msgblocks)
- if text == __doc__:
- print 'They match!'
- else:
- print 'They differ!'
-